Fork me on GitHub

【Java多线程】JUC锁 10. Semaphore

Semaphore

1. 前言

  • Semaphore是一个计数信号量,它的本质是一个共享锁。
  • 线程可以通过调用acquire()来获取信号量的许可;当信号量中有可用的许可时,线程能获取该许可;否则线程必须等待,直到有可用的许可为止。 线程可以通过release()来释放它所持有的信号量许可。
  • Semaphore包含Sync对象,有公平信号量和非公平信号量之分

2. 源码解析

2.1 数据结构

同ReentrantLock一样,Semaphore包含Sync对象,有公平信号量和非公平信号量之分,默认非公平信号量

2.2 内部类

  • FairSync
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;

FairSync(int permits) {
super(permits);
}

protected int tryAcquireShared(int acquires) {
for (;;) {
// 判断“当前线程”是不是CLH队列中的第一个线程线程,
// 若是的话,则返回-1。
if (hasQueuedPredecessors())
return -1;
// 可以获得的信号量的许可数
int available = getState();
// 获得acquires个信号量许可之后,剩余的信号量许可数
int remaining = available - acquires;
// 如果“剩余的信号量许可数>=0”,则设置“可以获得的信号量许可数”为remaining。
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
}
  • NonfairSync
1
2
3
4
5
6
7
8
9
10
11
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;

NonfairSync(int permits) {
super(permits);
}

protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}

nonfairTryAcquireShared 在Sync中定义:

1
2
3
4
5
6
7
8
9
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

公平信号量和非公平信号量的获取信号量机制不同:对于公平信号量而言,如果当前线程不在CLH队列的头部,则排队等候;而对于非公平信号量而言,无论当前线程是不是在CLH队列的头部,它都会直接获取信号量。

而释放信号量的机制是相同的。

2.3 核心方法

  • Semaphore()构造方法
1
2
3
4
5
6
public Semaphore(int permits) {
sync = new NonfairSync(permits); //默认非公平锁
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

是通过Sync来设置锁计数

1
2
3
Sync(int permits) {
setState(permits);
}
  • acquire()信号量获取
1
2
3
4
5
6
7
8
9
10
11
// 从信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞,或者线程已被中断。
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}

// 从信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}

实际上是调用的AQS中的acquireSharedInterruptibly()。

1
2
3
4
5
6
7
8
9
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 如果线程是中断状态,则抛出异常。
if (Thread.interrupted())
throw new InterruptedException();
// 否则,尝试获取“共享锁”;获取成功则直接返回,获取失败,则通过doAcquireSharedInterruptibly()获取。
if (tryAcquireShared(arg) < 0) //公平信号量和非公平信号量区别
doAcquireSharedInterruptibly(arg);
}

doAcquireSharedInterruptibly()使当前线程等待,直到获取共享锁或被中断才返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private void doAcquireSharedInterruptibly(long arg)
throws InterruptedException {
// 创建”当前线程“的Node节点,且Node中记录的锁是”共享锁“类型;并将该节点添加到CLH队列末尾。
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 获取上一个节点。
// 如果上一节点是CLH队列的表头,则”尝试获取共享锁“。
final Node p = node.predecessor();
if (p == head) {
long r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 当前线程一直等待,直到获取到共享锁。
// 如果线程在等待过程中被中断过,则再次中断该线程(还原之前的中断状态)。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
  • release()信号量释放
1
2
3
4
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}

releaseShared()方法在AQS中定义

1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { //尝试释放共享锁
doReleaseShared(); //释放共享锁
return true;
}
return false;
}

tryReleaseShared()在Sync中定义,公平信号量和非公平信号量机制相同

1
2
3
4
5
6
7
8
9
10
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState(); //当前可获取的信号量数
int next = current + releases; //释放releases个信号后,信号量数
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next)) //cas改变信号量state
return true;
}
}

doReleaseShared()释放共享锁。它会从前往后的遍历CLH队列,依次“唤醒”然后“执行”队列中每个节点对应的线程;最终的目的是让这些线程释放它们所持有的信号量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void doReleaseShared() {
for (;;) {
// 获取CLH队列的头节点
Node h = head;
// 如果头节点不为null,并且头节点不等于tail节点。
if (h != null && h != tail) {
// 获取头节点对应的线程的状态
int ws = h.waitStatus;
// 如果头节点对应的线程是SIGNAL状态,则意味着“头节点的下一个节点所对应的线程”需要被unpark唤醒。
if (ws == Node.SIGNAL) {
// 设置“头节点对应的线程状态”为空状态。失败的话,则继续循环。
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 唤醒“头节点的下一个节点所对应的线程”。
unparkSuccessor(h);
}
// 如果头节点对应的线程是空状态,则设置“头节点对应的线程所拥有的共享锁”为其它线程获取锁的空状态。
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果头节点发生变化,则继续循环。否则,退出循环。
if (h == head) // loop if head changed
break;
}
}

3. 参考

http://www.cnblogs.com/skywang12345/p/3534050.html